#[allow(dead_code)]

use once_cell::sync::OnceCell;
use std::sync::{RwLock, RwLockWriteGuard, RwLockReadGuard, Arc};
use std::collections::HashMap;
use tokio::runtime::Runtime;
use futures::{future};
use crate::util::{AsyncSend, async_channel_new, AsyncRecv};
use async_trait::async_trait;
use crate::logical_plan::work_node::util::ErrResult;
use crate::data_frame::DataInstance;
use crate::conf_init::{EventFlowNodeConfInstance, MapResult, ResultErr};
use crate::source::file::{SourceFileConf, SourceFile};

mod file;

pub type SourceResult = Result<DataInstance,ResultErr>;

#[async_trait]
pub trait Source:Send+Sync{
    async fn recv(&mut self)->SourceResult;
}

#[derive(Deserialize)]
pub enum SourceWay{
    file(SourceFileConf),
}
#[derive(Deserialize,Clone)]
pub enum SourceFormat{
    json,
    arrow,
}
#[derive(Deserialize)]
pub struct SourceConf{
    name: String,
    threads: usize,
    way: SourceWay,
    format: SourceFormat,
}

pub struct SourceListConf{
    source: Vec<SourceConf>,
}
pub struct SourceInstance{
    name: String,
    next: HashMap<String,AsyncSend>,
    ctl_recv: AsyncRecv,
    source: Box<dyn Source>,
}
pub struct SourceCtl{
    name: String,
    runtime: Runtime,
    ctl_send: AsyncSend,
    source: Option<SourceInstance>,
}
lazy_static!{
    static ref SOURCE_CTL_LIST: RwLock<HashMap<String, SourceCtl>> = {
        match source_init_from_conf(&EventFlowNodeConfInstance.source){
            MapResult::Ok(t) => {t},
            MapResult::Err(e) => {
                error!("source_init_from_conf failed, [{:?}]", e);
                std::process::exit(-1);
            },
        }
    };
}
fn source_init(way: &SourceWay, format: &SourceFormat)->MapResult<Box<dyn Source>>{
    match way{
        SourceWay::file(cnf) => {return SourceFile::new(format, cnf);},
    }
}
pub fn source_init_from_conf(conf: &Vec<SourceConf>)->MapResult<RwLock<HashMap<String,SourceCtl>>>{

    let mut source_ctl_list = HashMap::new();

    for source_conf in conf.iter(){
        let source_runtime = if let Ok(t) = tokio::runtime::Builder::new_multi_thread()
            .thread_name(format!("source_{}", source_conf.name))
            .worker_threads(source_conf.threads)
            .build(){
            t
        }else{
            let err_desc = format!("work_space [{}] runtime init failed", source_conf.name);
            return MapResult::Err(ResultErr::new(err_desc));
        };

        let (ctl_send,ctl_recv) = async_channel_new();

        let source = SourceInstance{
            name: source_conf.name.clone(),
            next: HashMap::new(),
            ctl_recv: ctl_recv,
            source: source_init(&source_conf.way, &source_conf.format)?,
        };
        let tmp_source_ctl = SourceCtl{
            name: source_conf.name.clone(),
            runtime: source_runtime,
            ctl_send: ctl_send,
            source: Some(source),
        };

        source_ctl_list.insert(source_conf.name.to_string(), tmp_source_ctl);
    }

    return MapResult::Ok(RwLock::new(source_ctl_list));
}

pub fn source_add_next(source_name: &str, next_name: &str)->MapResult<AsyncRecv>{
    let mut source_ctl_list = match SOURCE_CTL_LIST.try_write(){
        Ok(t) => {t},
        Err(e) => {
            let err_desc = format!("SOURCE_CTL_LIST try_write failed {:?}", e);
            return MapResult::Err(ResultErr::new(err_desc));
        },
    };

    if let Some(tmp) = source_ctl_list.get_mut(source_name){

        if let Some(ref mut next) = tmp.source{
            let (s,r) = async_channel_new();
            next.next.insert(next_name.to_string(), s);
            return MapResult::Ok(r);
        }else{
            let err_desc = format!("source [{}] add next [{}], source is None", source_name, next_name);
            return MapResult::Err(ResultErr::new(err_desc));
        }
    }else{
        let err_desc = format!("source [{}] not exist", source_name);
        return MapResult::Err(ResultErr::new(err_desc));
    }
}

pub fn source_run()->MapResult<()>{
    let mut source_ctl_list = match SOURCE_CTL_LIST.try_write(){
        Ok(t) => {t},
        Err(e) => {
            let err_desc = format!("SOURCE_CTL_LIST try_write failed {:?}", e);
            return MapResult::Err(ResultErr::new(err_desc));
        },
    };

    let mut source_name_list = Vec::new();

    source_ctl_list.iter().map(|(n,_)|{
        source_name_list.push(n.to_string());
    });

    for source_name in source_name_list.iter(){

        if let Some(mut source_ctl) = source_ctl_list.get_mut(source_name){
            if let Some(source) = source_ctl.source.take(){
                source_ctl.runtime.spawn(
                    source_input(source)
                );
            }else{
                error!("source [{}] source is None", source_name);
                std::process::exit(-1);
            }
        }else{
            error!("source [{}] not exist", source_name);
            std::process::exit(-1);
        }
    }

    return MapResult::Ok(());
}

async fn source_input(mut source: SourceInstance){

    loop{
        match source.source.recv().await{
            SourceResult::Ok(data) => {
                for (_,next) in source.next.iter(){
                    next.send(data.clone()).await;
                }
            },
            SourceResult::Err(e) => {
                ;
            },
        }
    }
}